package org.databandtech.mysql2clickhouse;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.databandtech.mysql2clickhouse.sink.SinkToClickhouse;
import org.databandtech.mysql2clickhouse.source.SourceFromMySQLByTuple5;

public class MysqlToClickhouse {

	// 数据库相关
	final static String URL = "jdbc:mysql://127.0.0.1:3306/databand?useUnicode=true&characterEncoding=utf-8&useSSL=false";
	final static String USER = "root";
	final static String PASS = "mysql";
	final static String SQL = "SELECT title,status,vid,cover_id,url from b_video ORDER BY id DESC LIMIT 100";
	final static String[] COLUMNS_READ = new String[] { "title", "status", "vid", "cover_id", "url" };

	// Clickhouse 相关
	final static String CHURL = "jdbc:clickhouse://localhost:8123/databand";
	final static String CHUSER = "root";
	final static String CHPASS = "mysql";
	final static String CHSQL = "INSERT INTO test.writer";

	public static void main(String[] args) {
		
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000));

		// ######### 数据读取 #########
		DataStreamSource<Tuple5<String, Integer, String, String, String>> sourceStream = env
				.addSource(new SourceFromMySQLByTuple5(URL, USER, PASS, COLUMNS_READ, SQL));

		sourceStream.print();
		//Map装载
        //SingleOutputStreamOperator<String> dataStream = sourceStream.map(new MapTransformation());
        
		sourceStream.addSink(new SinkToClickhouse(CHURL,CHUSER,CHPASS,CHSQL));

		try {
			env.execute("ok-clickhouse");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}
